import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "svg"
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id
# Set random seed
np.random.seed(42)
# Change Plotly renderer for notebooks
pio.renderers.default = "notebook"Assignment 03
1 Import Packges
2 Load Dataset
# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()
# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("/home/ubuntu/assignment-03-Sabrina1211/data/lightcast_job_postings.csv")
# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")
#df.printSchema() # comment this line when rendering the submission
#df.show(5)Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/21 00:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:> (0 + 1) / 1]
3 Data Preparation
# Step 1: Casting Salary and experience columns
df = df.withColumn("SALARY", col("SALARY").cast("float")) \
.withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
.withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
.withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
.withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))
# Step 2: Computing medians for salary columns
def compute_median(sdf, col_name):
q = sdf.approxQuantile(col_name, [0.5], 0.01)
return q[0] if q else None
median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")
print("Medians:", median_from, median_to, median_salary)
# Step 4: Imputing missing salaries, but no experience
df = df.fillna({
"SALARY_FROM": median_from,
"SALARY_TO": median_to,
"SALARY": median_salary
})
# Step 5: Computing Average Salary
df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) /
2)
# Step 6: Selecting required columns
export_cols = [
"EDUCATION_LEVELS_NAME",
"REMOTE_TYPE_NAME",
"MAX_YEARS_EXPERIENCE",
"Average_Salary",
"SALARY",
"LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]
df_selected = df.select(*export_cols)
# Step 7: Saving to csv
pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)
print("Data cleaning complete. Rows retained:", len(pdf))[Stage 2:> (0 + 1) / 1] [Stage 3:> (0 + 1) / 1] [Stage 4:> (0 + 1) / 1]
Medians: 87295.0 130042.0 115024.0
[Stage 5:> (0 + 1) / 1]
Data cleaning complete. Rows retained: 72498
4 Salary Distribution by Industry and Employment Type
# Your Code for 1st question here
# Filter out missing or zero salary values
pdf = (
df.select("EMPLOYMENT_TYPE_NAME", "SALARY")
.withColumn("SALARY", col("SALARY").cast("double"))
.filter(col("SALARY") > 0)
.toPandas()
)
# Clean employment type names for better readability
pdf["EMPLOYMENT_TYPE_NAME"] = (
pdf["EMPLOYMENT_TYPE_NAME"]
.astype("string") # ensures string ops work
.fillna("Unknown") # avoid NoneType
.str.replace(r"[^\x00-\x7F]+", "", regex=True)
.str.strip()
)
# Compute median salary for sorting
median_salaries = pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()
# Sort employment types based on median salary in descending order
sorted_employment_types = median_salaries.sort_values(ascending=False).index
# Apply sorted categories
pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(
pdf["EMPLOYMENT_TYPE_NAME"],
categories=sorted_employment_types,
ordered=True
)
# Create box plot with horizontal grid lines
fig = px.box(
pdf,
x="EMPLOYMENT_TYPE_NAME",
y="SALARY",
title="Salary Distribution by Employment Type",
color_discrete_sequence=["black"], # Single neutral color
boxmode="group",
points="all", # Show all outliers
)
# Improve layout, font styles, and axis labels
fig.update_layout(
title=dict(
text="Salary Distribution by Employment Type",
font=dict(size=30, family="Arial", color="black", weight="bold"), # Bigger & Bold Title
),
xaxis=dict(
title=dict(text="Employment Type", font=dict(size=24, family="Arial", color="black", weight="bold")), # Bigger X-label
tickangle=0, # Rotate X-axis labels for readability
tickfont=dict(size=18, family="Arial", color="black", weight="bold"), # Bigger & Bold X-ticks
showline=True, # Show axis lines
linewidth=2, # Thicker axis lines
linecolor="black",
mirror=True,
showgrid=False, # Remove vertical grid lines
categoryorder="array",
categoryarray=sorted_employment_types.tolist(),
),
yaxis=dict(
title=dict(text="Salary (K $)", font=dict(size=24, family="Arial", color="black", weight="bold")), # Bigger Y-label
tickvals=[0, 50000, 100000, 150000, 200000, 250000, 300000, 350000, 400000, 450000, 500000],
ticktext=["0", "50K", "100K", "150K", "200K", "250K", "300K", "350K", "400K", "450K", "500K"],
tickfont=dict(size=18, family="Arial", color="black", weight="bold"), # Bigger & Bold Y-ticks
showline=True,
linewidth=2,
linecolor="black",
mirror=True,
showgrid=True, # Enable light horizontal grid lines
gridcolor="lightgray", # Light shade for the horizontal grid
gridwidth=0.5, # Thin grid lines
),
font=dict(family="Arial", size=16, color="black"),
boxgap=0.7,
plot_bgcolor="white",
paper_bgcolor="white",
showlegend=False,
height=500,
width=850,
)
# Show the figure
fig.show()[Stage 6:> (0 + 1) / 1]